data-clean

本文最后更新于:2024年2月8日 中午

Tools

pandas

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
df.loc[df.AAA >= 5, "BBB"] = -1#数据替换
df.loc[(df["BBB"] < 25) & (df["CCC"] >= -40), "AAA"]#这个操作是直接展现数据
df.loc[df.AAA >= 5, ["BBB", "CCC"]] = 555#多数据替换
df.where(df_mask, -1000)#mask 用false替换掉
df["logic"] = np.where(df["AAA"] > 5, "high", "low")



#数据裁剪 可以直接在变量的列表框中直接进行创建
df[(df.AAA <= 6) & (df.index.isin([0, 2, 4]))]#.index表示行的处理方式.可以指定对应的标签
df2.iloc[1:3] # Position-oriented
df2.loc[1:3] # Label-oriented

# 用映射处理新的列 Efficiently and dynamically creating new columns using DataFrame.map (previously named applymap)
source_cols = df.columns # 创建出第一列的列名,同时这里是一个列表
new_cols = [str(x) + "_cat" for x in source_cols]#按照str列名创建
categories = {1: "Alpha", 2: "Beta", 3: "Charlie"}#字典
df[new_cols] = df[source_cols].map(categories.get)#映射的方式

#有一个重要的函数apply可以把所有的函数运用在特定的位置上

#merge
pd.merge(left,right,how="left",on=[])#向左边的键位进行连接,同时on表示通过什么键来连接

# Arithmetic .. Performing arithmetic with a MultiIndex that needs broadcasting
cols = pd.MultiIndex.from_tuples(
[(x, y) for x in ["A", "B", "C"] for y in ["O", "I"]]
)
#也就是以上可以创建一个两元的组,同时两者可以同时处理

# group
### 这里可以按照某一个特定的id做一次聚类,再对里面的数据完成操作
def GrowUp(x):
avg_weight = sum(x[x["size"] == "S"].weight * 1.5)
avg_weight += sum(x[x["size"] == "M"].weight * 1.25)
avg_weight += sum(x[x["size"] == "L"].weight)
avg_weight /= len(x)
return pd.Series(["L", avg_weight, True], index=["size", "weight", "adult"])
#weight 权重可以限制其为几倍的操作

pd.Series([i / 100.0 for i in range(1, 11)])
def cum_ret(x, y):
return x * (1 + y)
def red(x):
return functools.reduce(cum_ret, x, 1.0)
# 累积运用多次以上的操作

#注意transform参数的是用一个列作为参数 sort_values 函数通过参量里面的哪一个值来进行计算
df["Counts"] = df.groupby(["Color"]).transform(len)#这里的groupby含有两个元素,但是len会通过广播机制反映到所有的color列中

df["beyer_shifted"] = df.groupby(level=0)["beyer"].shift(1)#shift是作为向下移动的过程
df.groupby(level=0)#对于第一个列进行索引




数据清洗

1
2
3
4
"\n".join(list) #表示用list字符重新生成字符串,同时里面内容用\n切割
[::-1]#从后向前遍历
[5::6]#表示从5起,步长为6开始遍历

simpy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import simpy

def main():
env = simpy.Environment()
env.process(traffic_light(env))
env.run(until=120)
print('simulation done')

# traffic_light是一个process,传入env是告诉traffic_light它是在env里执行的,而不是其它环境
#yield 可以随时的从当前的时间段停下来,然后下一次读取的时候在重复
def traffic_light(env):
while True:
print(f'light green at :{env.now}')
yield env.timeout(30)
print(f'light yellow at :{env.now}')
yield env.timeout(5)
print(f'light red at :{env.now}')
yield env.timeout(20)

if __name__ == '__main__':
main()


def example(env):
value = yield env.timeout(env,delay=1,value=42)
print('now=%d,value=%d'%(env.now,value))
env = simpy.Environment()
p = env.process(example(env))

generate是一种迭代器,一次只能使用一个值,同时这个只能使用一次,之后相当于会忘记这里的值,也就是无法使用这个行为 yield 是一个用法与return类似的关键词,只是这个函数返回一个生成器 调用包含yield的函数会返回一个生成器,函数中的代码并没有运行,调用生成器后,每次遇到 yield 时函数会暂停并保存当前所有的运行信息(保留局部变量),返回 yield 的值, 并在下一次迭代时从当前位置继续运行,直到生成器被全部遍历完。

1
2
3
4
5
6
7
8
def create_generator():
mylist = range(3)
for i in mylist:
yield i*i
mygenerator = create_generator()
for i in mygenerator:
print(i)
env.run(until=10)#会一直运行10s
Environment.active_process:获取当前执行的进程,只有在进程函数里能获取到该信息
1
2
3
4
5
6
7
8
9
10
import simpy
def subfunc(env):
print(env.active_process)#process创建进程,run执行
def proc(env):
while True:
print(env.active_process)
subfunc(env)
yield env.timeout(1)#进程应该采用的阻塞器的方式做出生成器的方式
env = simpy.Environment()
p1 = env.process(proc(env))
#### event basic 3种状态: 1. 将要发生triggered = True 放入事件队列中 2. 可能发生triggered = False 3. 已经发生processed = True 处理,就会弹出 Event.callbacks:回调列表,只要event没有被处理,就可以添加回调。回调就是event以参数的形式存储在Event.callbacks列表中 触发后,可能成功也可能失败 Event.succeed(value=None):触发一件event并将其标记为成功 Event.fail(exception):触发event失败,并闯入原因 Event.trigger(event):触发event通用方法,返回成功或者失败的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import simpy
class school:
def __init__(self, env):
self.env = env
self.class_end = env.event()
# 创建三个pupil进程和一个bell进程
self.pupil_procs = [env.process(self.pupil()) for i in range(3)]
self.bell_procs = env.process(self.bell())

def bell(self):
for i in range(2): # (1)bell循环
yield self.env.timeout(45) # (2)等待45
self.class_end.succeed() # (3)触发class_end标记成功
self.class_end = self.env.event() # (4)生成新的class_end事件
def pupil(self):
for i in range(2): # (5)pupil循环
print(r'\0/',end=' ') # (6)输出\0/
yield self.class_end #(7)中断处理class_end事件
school = School(env)
env = simpy.Environment()
env.run()

# charge & run
class Car(object):
def __init__(self, env):
self.env = env
# Start the run process everytime an instance is created.
self.action = env.process(self.run())

def run(self):
while True:
print('Start parking and charging at %d' % self.env.now)
charge_duration = 5
# We yield the process that process() returns
# to wait for it to finish
yield self.env.process(self.charge(charge_duration))

# The charge process has finished and
# we can start driving again.
print('Start driving at %d' % self.env.now)
trip_duration = 2
yield self.env.timeout(trip_duration)
def driver(env, car):
yield env.timeout(3)
car.action.interrupt()#进程的中断

def charge(self, duration):
yield self.env.timeout(duration) #yield 本身也是作为返回一种迭代器的方式返回
#需要n
import simpy
env = simpy.Environment()
car = Car(env)
env.run(until=15)

### !!!! 注意进程的创建本身也是在加入一个函数里面含有yield函数即可
#### 采用interupt函数中断

class Car(object):
def __init__(self, env):
self.env = env
self.action = env.process(self.run())

def run(self):
while True:
print('Start parking and charging at %d' % self.env.now)
charge_duration = 5
# We may get interrupted while charging the battery
try:
yield self.env.process(self.charge(charge_duration))
except simpy.Interrupt:
# When we received an interrupt, we stop charging and
# switch to the "driving" state
print('Was interrupted. Hope, the battery is full enough ...')

print('Start driving at %d' % self.env.now)
trip_duration = 2
yield self.env.timeout(trip_duration)

def charge(self, duration):
yield self.env.timeout(duration)

def example(env):
event = simpy.events.Timeout(env, delay=1, value=42)
value = yield event
print('now=%d, value=%d' % (env.now, value))
env = simpy.Environment()
example_gen = example(env)
p = simpy.events.Process(env, example_gen)
env.run()# 可以通过value函数,在yield 后返回一个对应的value的值

# run同时可以传递一个process进去此时run会返回相关的函数的return值

peek() returns the time of the next scheduled event or infinity (float('inf')) if no future events are scheduled.

step() processes the next scheduled event. It raises an EmptySchedule exception if no event is available.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
until = 10
while env.peek() < until:
env.step()

def subfunc(env):
print(env.active_process) # will print "p1" 指向当前执行进程的pid

def my_proc(env):
while True:
print(env.active_process) # will print "p1"
subfunc(env)
yield env.timeout(1)

env = simpy.Environment()
p1 = env.process(my_proc(env))
env.active_process # None
env.step()

# callback list
import simpy

def my_callback(event):
print('Called back from', event)

env = simpy.Environment()
event = env.event()#相当于创建一个时间对象,之后开始修改并运用
event.callbacks.append(my_callback)
event.callbacks


class School:
def __init__(self, env):
self.env = env
self.class_ends = env.event()#这里初始化了一个对象,重复调用只能去调用这个对象中已经使用的部分,而不能重复调用这个创建函数
# init函数
self.pupil_procs = [env.process(self.pupil()) for j in range(3)]
self.bell_proc = env.process(self.bell())

def bell(self):
for i in range(2):
yield self.env.timeout(45)
print("hi")
self.class_ends.succeed()
self.class_ends = self.env.event()
print(env.now)

def pupil(self):
for i in range(2):
print(r' \o/ ', end='' )
yield self.class_ends #yield 创建了,然后直接切断,进行下一个时间,
print("hello")
env = simpy.Environment()
school = School(env)
env.run()

#process as event
def sub(env):
yield env.timeout(1)
return 23

def parent(env):
ret = yield env.process(sub(env))#相当于等待子进程完成之后在
return ret


# Waiting for multiple events at once
from simpy.events import AnyOf, AllOf, Event
events = [Event(env) for i in range(3)]
a = AnyOf(env, events) # Triggers if at least one of "events" is triggered.
b = AllOf(env, events) # Triggers if all each of "events" is triggered.

# 创建以上的方式 ,events作为对象列来等待操作

限制了函数的执行


data-clean
http://example.com/2023/09/16/data-clean/
作者
NGC6302
发布于
2023年9月16日
许可协议